import copy
import inspect
from typing import Any, Callable, Dict, Iterable, Optional

from triad import extension_method
from triad.collections.function_wrapper import (
    AnnotatedParam,
    FunctionWrapper,
    function_wrapper,
)
from triad.utils.assertion import assert_or_throw
from triad.utils.convert import get_caller_global_local_vars, to_function

from fugue.constants import FUGUE_ENTRYPOINT
from fugue.exceptions import FugueInterfacelessError
from fugue.workflow.workflow import FugueWorkflow, WorkflowDataFrame, WorkflowDataFrames


def module(
    func: Optional[Callable] = None,
    as_method: bool = False,
    name: Optional[str] = None,
    on_dup: str = "overwrite",
) -> Any:
    """Decorator for module

    Please read :doc:`Module Tutorial <tutorial:tutorials/module>`
    """

    if func is None:  # @module(...)
        return lambda func: module(func, as_method=as_method, name=name, on_dup=on_dup)
    # @module
    res = _ModuleFunctionWrapper(func)
    if as_method:
        extension_method(func, name=name, on_dup=on_dup)
    return res


def _to_module(
    obj: Any,
    global_vars: Optional[Dict[str, Any]] = None,
    local_vars: Optional[Dict[str, Any]] = None,
) -> "_ModuleFunctionWrapper":
    if isinstance(obj, _ModuleFunctionWrapper):
        return obj
    global_vars, local_vars = get_caller_global_local_vars(global_vars, local_vars)
    try:
        f = to_function(obj, global_vars=global_vars, local_vars=local_vars)
        # this is for string expression of function with decorator
        if isinstance(f, _ModuleFunctionWrapper):
            return copy.copy(f)
        # this is for functions without decorator
        return _ModuleFunctionWrapper(f)
    except Exception as e:
        exp = e
    raise FugueInterfacelessError(f"{obj} is not a valid module", exp)


@function_wrapper(FUGUE_ENTRYPOINT)
class _ModuleFunctionWrapper(FunctionWrapper):
    def __init__(  # pylint: disable-all
        self,
        func: Callable,
        params_re: str = "^(w?(u|v+)|w(u?|v*))x*z?$",
        return_re: str = "^[uvn]$",
    ):
        super().__init__(func, params_re, return_re)

    def __call__(self, *args: Any, **kwargs: Any) -> Any:
        if self._need_add_workflow(*args, **kwargs):
            wf = self._infer_workflow(*args, **kwargs)
            assert_or_throw(wf is not None, ValueError("can't infer workflow"))
            return super().__call__(wf, *args, **kwargs)
        return super().__call__(*args, **kwargs)

    @property
    def has_input(self) -> bool:
        return any(
            isinstance(x, (_WorkflowDataFrameParam, _WorkflowDataFramesParam))
            for x in self._params.values()
        )

    @property
    def has_dfs_input(self) -> bool:
        return any(
            isinstance(x, _WorkflowDataFramesParam) for x in self._params.values()
        )

    @property
    def has_no_output(self) -> bool:
        return not isinstance(
            self._rt, (_WorkflowDataFrameParam, _WorkflowDataFramesParam)
        )

    @property
    def has_single_output(self) -> bool:
        return isinstance(self._rt, _WorkflowDataFrameParam)

    @property
    def has_multiple_output(self) -> bool:
        return isinstance(self._rt, _WorkflowDataFramesParam)

    @property
    def _first_annotation_is_workflow(self) -> bool:
        return isinstance(self._params.get_value_by_index(0), _FugueWorkflowParam)

    def _need_add_workflow(self, *args: Any, **kwargs: Any):
        if not self._first_annotation_is_workflow:
            return False
        if self._params.get_key_by_index(0) in kwargs:
            return False
        if len(args) > 0 and isinstance(args[0], FugueWorkflow):
            return False
        return True

    def _infer_workflow(self, *args: Any, **kwargs: Any) -> Optional[FugueWorkflow]:
        def select_args() -> Iterable[Any]:
            for a in args:
                if isinstance(a, (WorkflowDataFrames, WorkflowDataFrame)):
                    yield a
            for _, v in kwargs.items():
                if isinstance(v, (WorkflowDataFrames, WorkflowDataFrame)):
                    yield v

        wf: Optional[FugueWorkflow] = None
        for a in select_args():
            if isinstance(a, WorkflowDataFrame):
                assert_or_throw(
                    wf is None or a.workflow is wf,
                    ValueError("different parenet workflows found on input dataframes"),
                )
                wf = a.workflow
            elif isinstance(a, WorkflowDataFrames):
                for k, v in a.items():
                    assert_or_throw(
                        isinstance(v, WorkflowDataFrame),
                        lambda: ValueError(f"{k}:{v} is not a WorkflowDataFrame"),
                    )
                    assert_or_throw(
                        wf is None or v.workflow is wf,
                        ValueError(
                            "different parenet workflows found on input dataframes"
                        ),
                    )
                    wf = v.workflow
        return wf


@_ModuleFunctionWrapper.annotated_param(
    FugueWorkflow,
    "w",
    matcher=lambda x: inspect.isclass(x) and issubclass(x, FugueWorkflow),
)
class _FugueWorkflowParam(AnnotatedParam):
    pass


@_ModuleFunctionWrapper.annotated_param(WorkflowDataFrame, "v")
class _WorkflowDataFrameParam(AnnotatedParam):
    pass


@_ModuleFunctionWrapper.annotated_param(WorkflowDataFrames, "u")
class _WorkflowDataFramesParam(AnnotatedParam):
    pass
